{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "2227e466-f9e4-4882-9a21-da2b1824b301",
   "metadata": {},
   "source": [
    "# Generate Python UDFs for different cases"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "86b69eae-351d-45f4-ac16-f3bd8eb2bd42",
   "metadata": {},
   "source": [
    "## Initialization"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c7519a9c-0657-4aac-b3da-0e3e41995fdb",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession\n",
    "from pyspark_ai import SparkAI\n",
    "\n",
    "spark_ai = SparkAI(verbose=True)\n",
    "spark_ai.activate()  # active partial functions for Spark DataFrame\n",
    "spark = SparkSession.builder.getOrCreate()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d1a67d8e-6205-4b94-a6db-a3acf7c3f2e8",
   "metadata": {},
   "source": [
    "## Example 1: parsing heterogeneous JSON text\n",
    "\n",
    "It is a common problem when we are getting data in the from of JSON text. We know expected schema of JSON, but there is no guarantees about fields order and even missing keys are possible by the contract. Built-int spark functions are not well suited for such a case because `from_json` expected strict schema. Sometimes it is simpler to resolve such a problem with [PythonUDF](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.udf.html). With `pyspark-ai` we can simplify the procoess of creation such a function."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2f8a2172-0a62-4d39-95a3-68f42256a948",
   "metadata": {},
   "source": [
    "### Generation of Data\n",
    "\n",
    "But at first we need to generate a test sample of data. Let's do it by mutatin one single JSON."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "5cc4204a-64f4-4d21-8b52-bfa5c1c500a3",
   "metadata": {},
   "outputs": [],
   "source": [
    "random_dict = {\n",
    "    \"id\": 1279,\n",
    "    \"first_name\": \"John\",\n",
    "    \"last_name\": \"Doe\",\n",
    "    \"username\": \"johndoe\",\n",
    "    \"email\": \"john_doe@example.com\",\n",
    "    \"phone_number\": \"+1 234 567 8900\",\n",
    "    \"address\": \"123 Main St, Springfield, OH, 45503, USA\",\n",
    "    \"age\": 32,\n",
    "    \"registration_date\": \"2020-01-20T12:12:12Z\",\n",
    "    \"last_login\": \"2022-03-21T07:25:34Z\",\n",
    "}\n",
    "original_keys = list(random_dict.keys())\n",
    "\n",
    "from random import random, shuffle\n",
    "# Generate 20 mutated version of this dictionary\n",
    "mutaded_rows = []\n",
    "for _ in range(20):\n",
    "    keys = [k for k in original_keys]\n",
    "    shuffle(keys)\n",
    "    # With 0.4 chance drop each field and also shuffle an order\n",
    "    mutaded_rows.append({k: random_dict[k] for k in keys if random() <= 0.6})\n",
    "\n",
    "import json\n",
    "bad_json_dataframe = (\n",
    "    spark.createDataFrame(\n",
    "        [(json.dumps(val), original_keys) for val in mutaded_rows],\n",
    "        [\"json_field\", \"schema\"],\n",
    "    )\n",
    ")\n",
    "\n",
    "bad_json_dataframe.sample(0.5).toPandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "8485d712-6dc1-4071-aaec-141a3f675d5b",
   "metadata": {},
   "source": [
    "### Generate UDF function code"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "44f15de8-8008-46dc-99bc-072667187219",
   "metadata": {},
   "outputs": [],
   "source": [
    "from typing import List\n",
    "\n",
    "@spark_ai.udf\n",
    "def parse_heterogeneous_json(json_str: str, schema: List[str]) -> List[str]:\n",
    "    \"\"\"Extract fields from heterogeneous JSON string based on given schema in a right order.\n",
    "    If field is missing replace it by None. All imports should be inside function.\"\"\"\n",
    "    ..."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "b07775dd-e8c3-465f-ab97-4ba87933fbc1",
   "metadata": {},
   "source": [
    "It looks like `pyspark-ai` generate us a valid function. It iterate over expected schema and try to find such a field in given JSON string. If the key is missing it will return None. Also `pyspark-ai` added a neccessary import of `json` module from the python standard library."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4d3f29b6-74e4-4e57-ab86-0ce97df0bf82",
   "metadata": {},
   "outputs": [],
   "source": [
    "### Testing our UDF\n",
    "\n",
    "from pyspark.sql.functions import expr\n",
    "from pyspark.sql.types import ArrayType, StringType\n",
    "\n",
    "# Our UDF should return array<string>\n",
    "spark.udf.register(\"parse_heterogeneous_json\", parse_heterogeneous_json, returnType=ArrayType(elementType=StringType()))\n",
    "\n",
    "(\n",
    "    bad_json_dataframe\n",
    "    .withColumn(\"parsed\", expr(\"parse_heterogeneous_json(json_field, schema)\"))\n",
    "    .sample(0.5)\n",
    "    .toPandas()\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3475153e-0d10-4af0-93c0-9caa7b16c80f",
   "metadata": {},
   "source": [
    "## Example 2: Extract email from text"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "462da8ea-ceb0-4b9c-b191-d6c7e4922a93",
   "metadata": {},
   "source": [
    "### Generating data\n",
    "\n",
    "Lets creaete a DataFrame with raw text that contains email."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "48af4a4b-cd9d-4d2f-a4f4-2596ee682a66",
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.createDataFrame(\n",
    "    [\n",
    "        \"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed egestas nulla sit amet elit volutpat ultricies. Morbi lacinia est fringilla pulvinar elementum. Curabitur rhoncus luctus dui, sodales blandit arcu maximus a. Aenean iaculis nulla ac enim tincidunt, et tristique enim bibendum. Fusce mollis nibh sit amet nisi pellentesque egestas. Quisque volutpat, neque eu semper tristique, odio nunc auctor odio, at condimentum lorem nunc nec nisi. Quisque auctor at velit nec fermentum. Nunc id pellentesque erat, et dignissim felis. ali.brown@gmail.com Suspendisse potenti. Donec tincidunt enim in ipsum faucibus sollicitudin. Sed placerat tempor eros at blandit. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae; Donec aliquam velit vehicula massa egestas faucibus. Ut pulvinar mi id pretium dignissim. Phasellus vehicula, dui sit amet porttitor effectively maximizes an attacker's chance to obtain valid credentials. Sed malesuada justo enim, et interdum mauris ullamcorper ac.\",\n",
    "        \"Vestibulum rhoncus magna semper est lobortis gravida. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. In hac habitasse platea dictumst. michael.hobb@gmail.com Aenean sapien magna, consequat vitae pretium ac, gravida sit amet nibh. Maecenas lacinia orci in luctus placerat. Praesent lobortis turpis nec risus dapibus, eget ornare mi egestas. Nam eget dui ac mi laoreet sagittis. Integer condimentum est ac velit volutpat pharetra. Nulla facilisi. Nunc euismod, neque vitae porttitor maximus, justo dui efficitur ligula, vitae tincidunt erat neque ac nibh. Duis eu dui in erat blandit mattis.\",\n",
    "        \"Aenean vitae iaculis odio. Donec laoreet non urna sed posuere. Nulla vitae orci finibus, convallis mauris nec, mattis augue. Proin bibendum non justo non scelerisque. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Aenean scott_p@ymail.com adipiscing diam eget ultrices ultricies. Aliquam bibendum dolor vel orci posuere, sed pulvinar enim rutrum. Nulla facilisi. Sed cursus justo sed velit pharetra auctor. Suspendisse facilisis nibh id nibh ultrices luctus.\",\n",
    "        \"Quisque varius erat sed leo ornare, et elementum leo interdum. Aliquam erat volutpat. Ut laoreet tempus elit quis venenatis. Integer porta, lorem ut pretium luctus, erika.23@hotmail.com quis ipsum facilisis, feugiat libero sed, malesuada augue. Fusce id elementum sapien, sed SC ingeniously maximizes the chance to obtain valid credentials. Nullam imperdiet felis in metus semper ultrices. Integer vel quam consectetur, lobortis est vitae, lobortis sem. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos.\",\n",
    "        \"Sed consectetur nisl quis mauris laoreet posuere. Phasellus in elementum orci, vitae auctor dui. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Donec eleifend mauris id auctor blandit. john.smith@protonmail.com Integer quis justo non eros convallis aliquet cursus eu dolor. Praesent nec sem a massa facilisis consectetur. Nunc pharetra sapien non erat semper, ut tempus risus vulputate. Donec lacinia condimentum arcu, ac molestie metus interdum in. Duis arcu quam, hendrerit quis venenatis sed, porta at erat.\",\n",
    "    ],\n",
    "    schema=\"string\",\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5cba2b98-469e-4947-ab00-533904a2fb21",
   "metadata": {},
   "source": [
    "### Generate UDF function"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b75056b9-c83e-4994-9bd6-6ac2483adbce",
   "metadata": {},
   "outputs": [],
   "source": [
    "@spark_ai.udf\n",
    "def extract_email(text: str) -> str:\n",
    "    \"\"\"Extract first email from raw text\"\"\"\n",
    "    ..."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "81090e1b-5913-409e-93ac-666f36a90d17",
   "metadata": {},
   "source": [
    "### Testing our UDF"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3ea613a2-e5bd-4b9f-9ebf-adf6e0175303",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.udf.register(\"extract_email\", extract_email)\n",
    "df.withColumn(\"value\", expr(\"extract_email(value)\")).toPandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c33cf61c-3eef-4189-8c1b-de37567327c3",
   "metadata": {},
   "source": [
    "## Example 3: random number from Laplace distribution\n",
    "\n",
    "Random numbers from the [Laplace](https://en.wikipedia.org/wiki/Laplace_distribution) distribution is one of key components of [Differential Privacy](https://en.wikipedia.org/wiki/Differential_privacy#The_Laplace_mechanism). Unfortunately spark do not contain built in routine for such a task. Let's create a UDF that generate numbers from Laplace distribution."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e8fbe1f1-022f-45f4-84fd-7ea8cb032e39",
   "metadata": {},
   "source": [
    "### Genrating UDF"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4775749a-c53a-4b09-9055-651f044099e0",
   "metadata": {},
   "outputs": [],
   "source": [
    "@spark_ai.udf\n",
    "def laplace_random_number(loc: float, scale: float) -> float:\n",
    "    \"\"\"Generate a random number from Laplace distribution with given loc and scale in pure Python. Function should contain all necessary imports.\"\"\"\n",
    "    ..."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3d44cdd1-0fa8-48d9-83e2-86eaa52965da",
   "metadata": {},
   "source": [
    "### Testing UDF"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4669cb90-3ff6-495e-bcb9-a77ed9105135",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import lit\n",
    "from pyspark.sql.types import DoubleType\n",
    "\n",
    "spark.udf.register(\"laplace_random_number\", laplace_random_number, returnType=DoubleType())\n",
    "results = (\n",
    "    spark.sparkContext.range(0, 500_000)\n",
    "    .toDF(schema=\"int\")\n",
    "    .withColumn(\"loc\", lit(1.0).cast(\"double\"))\n",
    "    .withColumn(\"scale\", lit(0.3).cast(\"double\"))\n",
    "    .withColumn(\"laplace_random\", expr(\"laplace_random_number(loc, scale)\"))\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6009da97-812f-4919-9c52-88b708058788",
   "metadata": {},
   "source": [
    "We can use `numpy` to check our results."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "8b7180e8-de84-4b14-83e6-254ff5ef03f8",
   "metadata": {},
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "\n",
    "numpy_random_numbers = np.random.laplace(1.0, 0.3, 500_000)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7b079350-86bc-42dd-b718-13bd5b4bede9",
   "metadata": {},
   "outputs": [],
   "source": [
    "quantiles = results.ai.transform(\"Cumpute 10 quantiles of 'laplace_random' column\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "36692dc6-6d2b-4506-8a8d-31cac13fe8fc",
   "metadata": {},
   "outputs": [],
   "source": [
    "row = quantiles.collect()[0]\n",
    "spark_ai_quantiles = [row[f\"Q{n}\"] for n in range(1, 11)]\n",
    "numpy_quantiles = np.quantile(numpy_random_numbers, [x / 10.0 for x in range(1, 11)])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2de9665f-6df2-4fca-b8bd-c2cd80713e2b",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "pd.DataFrame({\"spark\": spark_ai_quantiles, \"numpy\": numpy_quantiles})"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "8d26edd9-293e-4eef-8fe7-ba60a1113f39",
   "metadata": {},
   "source": [
    "We can see that our result is very close to results from NumPy builtin function."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "714ff9f3-0360-473b-bf2c-d8ee9ed6d2ed",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
